#!/bin/sh

. "$SPEC_PATH/abstract/backup"

e2e_test_extra_hash() {
  "$SHELL" "$PROJECT_PATH/stackgres-k8s/ci/build/build-functions.sh" path_hash \
    "$(realpath --relative-to "$PROJECT_PATH" "$SPEC_PATH/abstract/backup")"
}

e2e_exclusive_lock() {
  true
}

e2e_test_install() {
  if [ "$(echo "$K8S_VERSION" | tr . '\n' | head -n 2 | xargs -I @ printf '%05d' @)" \
      -lt "$(echo "1.24" | tr . '\n' | xargs -I @ printf '%05d' @)" ]
  then
    echo "Skip $SPEC_NAME for Kubernetes older than 1.24"
    return
  fi

  k8s_cleanup_namespace "$OPERATOR_NAMESPACE"
  k8s_async_cleanup

  kubectl create namespace "$OPERATOR_NAMESPACE"

  install_operator_only

  STREAM_NAME="$(get_sgstreams_name "$SPEC_NAME-operation")"

  docker build -f "$SPEC_PATH/$SPEC_NAME.files/Dockerfile" -t dev.local/cloudevents-nodejs "$SPEC_PATH/$SPEC_NAME.files"
  load_image_k8s dev.local/cloudevents-nodejs

  install_knative

  create_or_replace_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" 1

  deploy_curl_pod "$CLUSTER_NAMESPACE"

  wait_pods_running "$CLUSTER_NAMESPACE" 2
  wait_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
}

e2e_test() {
  if [ "$(echo "$K8S_VERSION" | tr . '\n' | head -n 2 | xargs -I @ printf '%05d' @)" \
      -lt "$(echo "1.24" | tr . '\n' | xargs -I @ printf '%05d' @)" ]
  then
    echo "Skip $SPEC_NAME for Kubernetes older than 1.24"
    return
  fi

  run_test "Checking that stream is working" check_stream_is_working
}

check_stream_is_working() {
  cat << EOF | kubectl create -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: cloudevents-nodejs
  namespace: $CLUSTER_NAMESPACE
  labels:
    networking.knative.dev/visibility: cluster-local
spec:
  template:
    metadata:
      labels:
        group: stream-to-cloudevent
    spec:
      containers:
        - image: dev.local/cloudevents-nodejs
          imagePullPolicy: Never
EOF

  cat << 'EOF' | tee "$LOG_PATH/init-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
CREATE TABLE test(i bigint, PRIMARY KEY(i));

INSERT INTO test SELECT * FROM generate_series(1, 3);

DO $$BEGIN
EXECUTE 'CREATE FUNCTION create_complex_table() RETURNS void AS $sql$CREATE TABLE complex(i bigint,'
  || (SELECT string_agg(col_def, ', ') FROM (
    SELECT quote_ident('c_' || t.typname) || ' ' || pg_catalog.format_type(t.oid, NULL) AS col_def
  FROM pg_catalog.pg_type t LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
  WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
  AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid)
  AND pg_catalog.pg_type_is_visible(t.oid)
  AND t.typname NOT LIKE 'pg_%'
  AND t.typtype NOT IN ('p')
  AND t.typcategory NOT IN ('U','Z')
  ORDER BY t.typname) _) || ', PRIMARY KEY (i))' || '$sql$ LANGUAGE sql';
END$$;

DO $$BEGIN
EXECUTE 'CREATE FUNCTION insert_complex(i bigint) RETURNS void LANGUAGE plpgsql AS $plpgsql$BEGIN EXECUTE $insert$INSERT INTO complex SELECT $insert$ || i || $insert$, '
    || (SELECT string_agg(col_def, ', ')
      FROM (
        SELECT (
          CASE
            WHEN t.typcategory IN ('I') THEN '''1.2.3.4'''
            WHEN t.typcategory IN ('D') THEN '''' || NOW() || ''''
            WHEN t.typname = 'datemultirange' THEN '''{(,)}'''
            WHEN t.typname = 'daterange' THEN '''(,)'''
            WHEN t.typname = 'int4multirange' THEN '''{[1,2), [3,4)}'''
            WHEN t.typname = 'int4range' THEN '''[2,4)'''
            WHEN t.typname = 'int8multirange' THEN '''{[4,12)}'''
            WHEN t.typname = 'int8range' THEN '''(3,7)'''
            WHEN t.typname = 'nummultirange' THEN '''{[1.1,2.2)}'''
            WHEN t.typname = 'numrange' THEN '''(1.1,2.2)'''
            WHEN t.typname IN ('tsmultirange', 'tstzmultirange') THEN '''{[2011-01-01,2011-03-01)}'''
            WHEN t.typname IN ('tsrange', 'tstzrange') THEN '''[2011-01-01,2011-03-01)'''
            WHEN t.typname = 'dblink_pkey_results' THEN '''(1,2)'''
            WHEN t.typname = 'line' THEN '''{1,2,3}'''
            WHEN t.typname = 'point' THEN '''(1,2)'''
            WHEN t.typname = 'circle' THEN '''<(1,2),3>'''
            WHEN t.typname IN ('lseg','box','path','polygon') THEN '''((1,2),(3,4))'''
            WHEN t.typcategory IN ('Z') THEN '''t'''
            WHEN t.typtype IN ('r','m') OR t.typcategory IN ('A') THEN 'array[]'
            WHEN t.typcategory IN ('N','V','T') THEN '''1'''
            ELSE '''t''' END)
    || '::' || pg_catalog.format_type(t.oid, NULL) AS col_def
  FROM pg_catalog.pg_type t LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
  WHERE (t.typrelid = 0 OR (SELECT c.relkind = 'c' FROM pg_catalog.pg_class c WHERE c.oid = t.typrelid))
  AND NOT EXISTS(SELECT 1 FROM pg_catalog.pg_type el WHERE el.oid = t.typelem AND el.typarray = t.oid)
  AND pg_catalog.pg_type_is_visible(t.oid)
  AND t.typname NOT LIKE 'pg_%'
  AND t.typtype NOT IN ('p')
  AND t.typcategory NOT IN ('U','Z')
  ORDER BY t.typname) _) || '$insert$; END$plpgsql$;';
END$$;

SELECT create_complex_table();
SELECT insert_complex(i) FROM generate_series(1, 3) AS i; 
EOF

  cat << EOF | tee "$LOG_PATH/sgstream-working.yaml" | kubectl create -f -
apiVersion: stackgres.io/v1alpha1
kind: SGStream
metadata:
  namespace: $CLUSTER_NAMESPACE 
  name: "$STREAM_NAME"
spec:
  source:
    type: SGCluster
    sgCluster:
      name: "$CLUSTER_NAME"
      debeziumProperties:
        includeUnknownDatatypes: true
  target:
    type: CloudEvent
    cloudEvent:
      format: json
      binding: http
      http:
        url: $(knative_url)
        headers:
          Host: cloudevents-nodejs.$CLUSTER_NAMESPACE
  pods:
    persistentVolume:
      size: 1Gi
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.snapshot.snapshotCompleted | grep -qxF true'
  then
    success "snapshot completed"
  else
    fail "snapshot did not completed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.snapshot.rowsScanned["public.test"]' | grep -qxF 3
  then
    success "test table scanned"
  else
    fail "test table not scanned"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.snapshot.rowsScanned["public.complex"]' | grep -qxF 3
  then
    success "complex table scanned"
  else
    fail "complex table not scanned"
  fi

  cat << 'EOF' | tee "$LOG_PATH/insert-tables.sql" | kubectl exec -i -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-0" -c postgres-util -- psql -q -v ON_ERROR_STOP=on
INSERT INTO test SELECT * FROM generate_series(4, 6);

SELECT insert_complex(i) FROM generate_series(4, 6) AS i; 
EOF

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.connected | grep -qxF true'
  then
    success "streaming started"
  else
    fail "streaming not started"
  fi

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq .status.streaming.numberOfCommittedTransactions | grep -qxF 2'
  then
    success "streaming transaction successful"
  else
    fail "streaming transaction failed"
  fi

  if wait_until eval 'kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.streaming.totalNumberOfCreateEventsSeen' | grep -qxF 6'
  then
    success "streaming events successful"
  else
    fail "streaming events failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.events.totalNumberOfEventsSent' | grep -qxF 12
  then
    success "sent events successful"
  else
    fail "sent events failed"
  fi

  if kubectl get sgstream -n "$CLUSTER_NAMESPACE" "$STREAM_NAME" -o json | jq '.status.events.lastEventWasSent' | grep -qxF true
  then
    success "sent last event successful"
  else
    fail "sent last event failed"
  fi
}
